#!/usr/bin/env python
# -*- coding:utf-8 -*-
# 大数据看板实时统计，布控预警，后处理模块
# 作者：王成
# 日期：2017-04-18[未完成]
#import MySQLdb
import redis
import json
import Queue
import yaml,logging
from logging.handlers import TimedRotatingFileHandler,RotatingFileHandler
import _strptime
import requests
from datetime import datetime,date,timedelta
import time,timeit,sys,os,threading
from handle import HANDLE
from daemon import Daemon
reload(sys)
sys.setdefaultencoding('utf-8')

class MyDaemon(Daemon):            
    def analyse(self):
        #thread = threading.current_thread()
        tid = int(threading.current_thread().name)
        ps = self.redis_pubsub.pubsub(ignore_subscribe_messages=True)
        pipe_pubsub = self.redis_pubsub.pipeline()
        while 1:
            try:
                msg = self.MSG_QUEEN.get(timeout=1)
                if msg:
                    #print msg
                    #--------------------------------------------------------
                    #start = timeit.default_timer()
                    capture_time_unix = msg['capture_time_unix']
                    license_plates = [msg['license_plate'].encode('utf-8', "ignore") ,msg['license_plate2'].encode('utf-8', "ignore")]
                    del msg['capture_time_unix']
                    msg['source_id'] = msg['front_rear']
                    del msg['front_rear']
                    #-----------------------全数据订阅-----------------------
                    for publish_id,topic in self.pubsub_all.iteritems():
                        pipe_pubsub.publish(topic, json.dumps(msg))
                    pipe_pubsub.execute()
                    #-----------------------布控&追踪------------------------
                    task_ids = self.handle_pool[tid].process(license_plates=license_plates,lp_color_id=int(msg['plate_type_id2']),brand_id=int(msg['brand_id']),model_id=int(msg['model_id']),year_id=int(msg['year_id']),level_id=int(msg['level_id']),capture_time=capture_time_unix,location_id=str(msg['location_id']),direction_id=msg['direction_id'],device_id=str(msg['device_id']))
                    #end = timeit.default_timer()

                    #for task_id in task_ids:#是布控或追踪车辆
                    for task_id,match_lp_id in task_ids.iteritems():
                        #print 'match_lp_id',match_lp_id
                        style_id = self.handle_has_add.get(int(task_id))
                        if style_id:
                            if int(style_id) is 5:
                                logging.info('追踪[%d],入redis', int(task_id))
                                d = {"task_id":task_id,"info_id":msg['info_id'],"capture_time":msg['capture_time']}
                                rc = self.redis_slow.rpushx("ZZ:"+str(task_id),json.dumps(d))
                                if int(rc) is 0:#
                                    logging.info('任务[%d]长时间无人调阅,强制关闭', int(task_id))
                                    html = requests.get("http://"+self.config['mysql_web']['host']+"?c=common&m=web_api&token=6733352d99e61e9aecc1ca694f5e6758&action=u_bk&task_id=%d&status=%d" % (int(task_id),-2))
                            else:
                                logging.info('布控[%d],入mysql', int(task_id))
                                html = requests.get("http://"+self.config['mysql_web']['host']+"?c=common&m=web_api&token=6733352d99e61e9aecc1ca694f5e6758&action=i_bk&task_id=%d&info_id=%s&direction_id=%s&location_id=%s&year_id=%d&image_url=&plate_number=%s&plate_number2=%s&match_lp_id=%d&capture_time=%s" % (int(task_id),msg['info_id'],str(msg['direction_id']),str(msg['location_id']),int(msg['year_id']),license_plates[0],license_plates[1],match_lp_id,msg['capture_time']))
                                
                                html = requests.get("http://"+self.config['mysql_web']['host']+"?c=common&m=send_msg&token=6733352d99e61e9aecc1ca694f5e6758&capture_time=%d&location_id=%s&license_plate=%s&license_plate2=%s&match_lp_id=%d&direction_id=%s&year_id=%d&cap_id=%d" % (capture_time_unix,str(msg['location_id']),license_plates[0],license_plates[1],match_lp_id,str(msg['direction_id']),int(msg['year_id']),task_id))
                    #-----------------------特定数据订阅-----------------------
                    publish_ids = self.pubsub_pool[tid].process(level_id=int(msg['level_id']),capture_time=capture_time_unix,location_id=msg['location_id'],direction_id=msg['direction_id'],source_id=int(msg['source_id']))
                    for publish_id,_ in publish_ids.iteritems():#是订阅数据
                        topic = self.pubsub_part.get(publish_id)
                        if topic:
                            pipe_pubsub.publish(topic, json.dumps(msg))
                    pipe_pubsub.execute()
            except Queue.Empty:
                time.sleep(0.1)
                continue
            except Exception, e:
                logging.exception('处理布控时错误: %s', str(e))
                
    def monit_db_change(self):
        while True:
            try:
                #{u'task_id': u'38', u'style_id': u'3', u'license_plates': u'', u'plate_type_id': u'2', u'location_ids': u'', u'direction_ids': u'', u'level_ids': u'', u'brand_ids': u'26', u'model_ids': u'320', u'year_ids': u'429', u'handle_b': u'15:00:00', u'handle_e': u'16:00:00', u'valid_period_b': u'2017-05-10', u'valid_period_e': u'2017-05-10', u'status': u'1'}
                html = requests.get("http://"+self.config['mysql_web']['host']+"?c=common&m=web_api&token=6733352d99e61e9aecc1ca694f5e6758&action=q_bk")
                all_task = []
                try:
                    if html.status_code == requests.codes.ok:
                        all_task = json.loads(html.text)
                    else:
                        logging.exception('读取任务时错误:[%s],%s', html.status_code,html.text)
                except Exception,e:
                    logging.exception('读取任务时错误:%s,%s', str(e),html.text)
                for task in all_task:
                    if int(task['status']) is 1:
                        task['valid_period_b'] = int(time.mktime(datetime.strptime(task['valid_period_b'], "%Y-%m-%d").timetuple()))
                        task['valid_period_e'] = int(time.mktime(datetime.strptime(task['valid_period_e'], "%Y-%m-%d").timetuple()))+86400
                        task['handle_b'] = int(timedelta(hours=int(task['handle_b'][0:2]),minutes=int(task['handle_b'][3:5]),seconds=int(task['handle_b'][6:8])).total_seconds())
                        task['handle_e'] = int(timedelta(hours=int(task['handle_e'][0:2]),minutes=int(task['handle_e'][3:5]),seconds=int(task['handle_e'][6:8])).total_seconds())
                        if task['valid_period_e'] < int(time.mktime(datetime.now().timetuple())):
                            logging.info('过期[%d]', int(task['task_id']))
                            if int(task['task_id']) in self.handle_has_add:
                                del self.handle_has_add[int(task['task_id'])]
                                logging.info('删除布控[%d]', int(task['task_id']))
                                for handle in self.handle_pool:
                                    handle.del_task(int(task['task_id']))
                            html = requests.get("http://"+self.config['mysql_web']['host']+"?c=common&m=web_api&token=6733352d99e61e9aecc1ca694f5e6758&action=u_bk&task_id=%d&status=%d" % (int(task['task_id']),3))
                        else:
                            if int(task['task_id']) not in self.handle_has_add:
                                if task['license_plates']:
                                    task['license_plates'] = [x for x in task['license_plates'].split(',')]
                                else:
                                    task['license_plates'] = []
                                if task['plate_type_id']:
                                    task['lp_color_ids'] = [x for x in task['plate_type_id'].split(',')]
                                else:
                                    task['lp_color_ids'] = []                                    
                                if task['direction_ids']:
                                    task['direction_ids'] = [x for x in task['direction_ids'].split(',')]
                                else:
                                    task['direction_ids'] = []
                                if task['location_ids']:
                                    task['location_ids'] = task['location_ids'].strip(",")
                                    task['location_ids'] = [x for x in task['location_ids'].split(',')]
                                else:
                                    task['location_ids'] = []
                                if task['level_ids']:
                                    task['level_ids'] = [int(x) for x in task['level_ids'].split(',')]
                                else:
                                    task['level_ids'] = []
                                if task['brand_ids']:
                                    task['brand_ids'] = [int(x) for x in task['brand_ids'].split(',')]
                                else:
                                    task['brand_ids'] = []
                                if task['model_ids']:
                                    task['model_ids'] = [int(x) for x in task['model_ids'].split(',')]
                                else:
                                    task['model_ids'] = []
                                if task['year_ids']:
                                    task['year_ids'] = [int(x) for x in task['year_ids'].split(',')]
                                else:
                                    task['year_ids'] = []
                                self.handle_has_add[int(task['task_id'])] = int(task['style_id'])
                                if int(task['style_id'])==5:
                                    logging.info('添加追踪[%d]', int(task['task_id']))
                                    self.redis_slow.rpush("ZZ:"+str(task['task_id']),"")
                                    self.redis_slow.expire("ZZ:"+str(task['task_id']),60)
                                else:
                                    logging.info('添加布控[%d]', int(task['task_id']))
                                for handle in self.handle_pool:
                                    handle.add_task(int(task['task_id']),json.dumps(task))
                            else:
                                if int(task['style_id'])==5:
                                    rc = self.redis_slow.rpushx("ZZ:"+str(task['task_id']),"")
                                    if int(rc) is 0:# 
                                        logging.info('任务[%d]长时间无人调阅,强制关闭', int(task['task_id']))
                                        html = requests.get("http://"+self.config['mysql_web']['host']+"?c=common&m=web_api&token=6733352d99e61e9aecc1ca694f5e6758&action=u_bk&task_id=%d&status=%d" % (int(task['task_id']),-2))

                    elif int(task['status']) is -2 or int(task['status']) is -1:
                        #用户关闭了，把状态改成3
                        if int(task['task_id']) in self.handle_has_add:
                            del self.handle_has_add[int(task['task_id'])]
                            logging.info('删除布控[%d]', int(task['task_id']))
                            for handle in self.handle_pool:
                                handle.del_task(int(task['task_id']))
                        html = requests.get("http://"+self.config['mysql_web']['host']+"?c=common&m=web_api&token=6733352d99e61e9aecc1ca694f5e6758&action=u_bk&task_id=%d&status=%d" % (int(task['task_id']),3))
                html = requests.get("http://"+self.config['mysql_web']['host']+"?c=common&m=web_api&token=6733352d99e61e9aecc1ca694f5e6758&action=q_pub")
                all_sub = []
                try:
                    if html.status_code == requests.codes.ok:
                        all_sub = json.loads(html.text)
                    else:
                        logging.exception('读取任务时错误:[%s],%s', html.status_code,html.text)
                except Exception,e:
                    logging.exception('读取任务时错误:%s,%s', str(e),html.text)
                for sub in all_sub:
                    if int(sub['status'])==1:
                        sub['valid_period_b'] = int(time.mktime(datetime.strptime(sub['valid_period_b'], "%Y-%m-%d").timetuple()))
                        sub['valid_period_e'] = int(time.mktime(datetime.strptime(sub['valid_period_e'], "%Y-%m-%d").timetuple()))+86400
                        sub['handle_b'] = int(timedelta(hours=int(sub['handle_b'][0:2]),minutes=int(sub['handle_b'][3:5]),seconds=int(sub['handle_b'][6:8])).total_seconds())
                        sub['handle_e'] = int(timedelta(hours=int(sub['handle_e'][0:2]),minutes=int(sub['handle_e'][3:5]),seconds=int(sub['handle_e'][6:8])).total_seconds())
                        if sub['valid_period_e'] < int(time.mktime(datetime.now().timetuple())):
                            logging.info('过期[%d]', int(sub['publish_id']))
                            if int(sub['publish_id']) in self.pubsub_part:
                                del self.pubsub_part[int(sub['publish_id'])]
                                logging.info('删除定量订阅[%d]', int(sub['publish_id']))
                                for pubsub in self.pubsub_pool:
                                    pubsub.del_task(int(sub['publish_id']))
                            if int(sub['publish_id']) in self.pubsub_all:
                                del self.pubsub_all[sub['publish_id']]
                                logging.info('删除全量订阅[%d]', int(sub['publish_id']))
                            html = requests.get("http://"+self.config['mysql_web']['host']+"?c=common&m=web_api&token=6733352d99e61e9aecc1ca694f5e6758&action=u_pub&publish_id=%d" % int(sub['publish_id']))
                        else:
                            if int(sub['publish_id']) not in self.pubsub_part and int(sub['publish_id']) not in self.pubsub_all:                        
                                sub['source_id'] = int(sub['source_id'])
                                if sub['level_ids']:
                                    sub['level_ids'] = sub['level_ids'].strip(",")
                                    sub['level_ids'] = [int(x) for x in sub['level_ids'].split(',')]
                                else:
                                    sub['level_ids'] = [] 
                                    
                                if sub['location_ids']:
                                    sub['location_ids'] = sub['location_ids'].strip(",")
                                    sub['location_ids'] = [x for x in sub['location_ids'].split(',')]
                                else:
                                    sub['location_ids'] = []
                                    
                                if sub['direction_ids']:
                                    sub['direction_ids'] = sub['direction_ids'].strip(",")
                                    sub['direction_ids'] = [x for x in sub['direction_ids'].split(',')]
                                else:
                                    sub['direction_ids'] = []
                                
                                if sub['source_id']==0 and len(sub['level_ids'])==0 and len(sub['location_ids'])==0 and len(sub['direction_ids'])==0:#全数据订阅
                                    logging.info('添加全量订阅[%d]', int(sub['publish_id']))
                                    self.pubsub_all[int(sub['publish_id'])] = sub['topic']
                                    continue
                                    
                                self.pubsub_part[int(sub['publish_id'])] = sub['topic']
                                logging.info('添加定量订阅[%d]', int(sub['publish_id']))
                                for pubsub in self.pubsub_pool:
                                    pubsub.add_task(int(sub['publish_id']),json.dumps(sub))                            
                    elif int(sub['status'])==2:
                        if int(sub['publish_id']) in self.pubsub_part:
                            del self.pubsub_part[int(sub['publish_id'])]
                            logging.info('删除定量订阅[%d]', int(sub['publish_id']))
                            for pubsub in self.pubsub_pool:
                                pubsub.del_task(int(sub['publish_id']))   
                        if int(sub['publish_id']) in self.pubsub_all:
                            del self.pubsub_all[int(sub['publish_id'])] 
                            logging.info('删除全量订阅[%d]', int(sub['publish_id']))
                        html = requests.get("http://"+self.config['mysql_web']['host']+"?c=common&m=web_api&token=6733352d99e61e9aecc1ca694f5e6758&action=u_pub&publish_id=%d" % int(sub['publish_id']))
            except Exception, e:
                logging.exception('读取布控单时错误: %s', str(e))
                #break
            time.sleep(10)            
            
    def counter(self):
        lua_get = """
        --redis.call('SELECT', 1)
        local mytable = {}
        --过车总数
        local gczs = redis.call('GET', KEYS[1]..':gczs')
        --redis.call('EXPIREAT',KEYS[1]..':gczs',1355292000)
        mytable['gczs'] = gczs
        --车牌总数
        local cpzs = redis.call('PFCOUNT', KEYS[1]..':cpzs')
        mytable['cpzs'] = cpzs
        --各地车牌总数
        local gdcpzs = redis.call('HGETALL', KEYS[1]..':gdcpzs')
        mytable['gdcpzs'] = {}
        local len = table.getn(gdcpzs)
        if(len>0)
        then
            for i = 1, len, 2 do
                mytable['gdcpzs'][tostring(gdcpzs[i])] = gdcpzs[i+1]
            end
        end
        --车辆类型总数
        local cllxzs = redis.call('HGETALL', KEYS[1]..':cllxzs')
        mytable['cllxzs'] = {}
        local len = table.getn(cllxzs)
        if(len>0)
        then
            for i = 1, len, 2 do
                mytable['cllxzs'][tostring(cllxzs[i])] = cllxzs[i+1]
            end
        end

        if(tonumber(KEYS[2])>0)
        then
            --区过车总数
            local qgczs = redis.call('GET', KEYS[1]..':'..KEYS[2]..':qgczs')
            mytable['qgczs'] = qgczs
            --区车牌总数
            local qcpzs = redis.call('PFCOUNT', KEYS[1]..':'..KEYS[2]..':qcpzs')
            mytable['qcpzs'] = qcpzs
            --区各地车牌总数
            local qgdcpzs = redis.call('HGETALL', KEYS[1]..':'..KEYS[2]..':qgdcpzs')
            mytable['qgdcpzs'] = {}
            local len = table.getn(qgdcpzs)
            if(len>0)
            then
                for i = 1, len, 2 do
                    mytable['qgdcpzs'][tostring(qgdcpzs[i])] = qgdcpzs[i+1]
                end
            end
            --区车辆类总数
            local qcllxzs = redis.call('HGETALL', KEYS[1]..':'..KEYS[2]..':qcllxzs')
            mytable['qcllxzs'] = {}
            local len = table.getn(qcllxzs)
            if(len>0)
            then
                for i = 1, len, 2 do
                    mytable['qcllxzs'][tostring(qcllxzs[i])] = qcllxzs[i+1]
                end
            end      
        end

        local re = cjson.encode(mytable)  
        return re
        --local cur = 0
        --repeat    
        --    redis.call('SETEX', cur, 400,400)
        --    cur=cur+1
        --until( tonumber(cur) ==4000000 )
        """
        multi_get = self.redis_counter.register_script(lua_get)
        
        
        #multi_get(keys=[20170503,370211],args=[])#keys:日期 区域     
        
    def run(self):      
        self.MSG_QUEEN = Queue.Queue(0)
        config_file = open(os.path.dirname(os.path.abspath(__file__)) + '/config.yaml')
        self.config = yaml.safe_load(config_file)
        config_file.close()
            
        name = 'yisa_counter' 
        logging.basicConfig(level=logging.INFO) 
        handler = RotatingFileHandler('/var/log/%s.log' % name, maxBytes=134217728, backupCount=7)
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        logging.getLogger('').addHandler(handler)
        #-------------------同步输出到控制台-------------------
        # console = logging.StreamHandler()
        # console.setLevel(logging.INFO)
        # formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        # console.setFormatter(formatter)
        # logging.getLogger('').addHandler(console)
        #-------------------------------------------------------
        logging.info('启动 [%s]', name)

        redis_quick = redis.StrictRedis(unix_socket_path=self.config['redis_quick']['unix_socket_path'])
        self.redis_slow = redis.StrictRedis(unix_socket_path=self.config['redis_slow']['unix_socket_path'])
        self.redis_pubsub = redis.StrictRedis(unix_socket_path=self.config['redis_pubsub']['unix_socket_path'])    
        self.redis_counter = redis.StrictRedis(unix_socket_path=self.config['redis_counter']['unix_socket_path'])
        lua_set = """
        --过车总数
        redis.call('INCR', KEYS[1]..':gczs')
        --redis.call('EXPIREAT',KEYS[1]..':gczs',1355292000)
        --区过车总数
        redis.call('INCR', KEYS[1]..':'..ARGV[3]..':qgczs')
        --redis.call('EXPIREAT',KEYS[1]..':'..ARGV[3]..':qgczs',1355292000)

        if(ARGV[2]~='无牌')
        then
            --车牌总数
            redis.call('PFADD', KEYS[1]..':cpzs', ARGV[2])
            --区车牌总数
            redis.call('PFADD', KEYS[1]..':'..ARGV[3]..':qcpzs', ARGV[2])
            --各地车牌总数
            redis.call('HINCRBY', KEYS[1]..':gdcpzs',string.sub(ARGV[2],0,4), 1)
            --区各地车牌总数
            redis.call('HINCRBY', KEYS[1]..':'..ARGV[3]..':qgdcpzs',string.sub(ARGV[2],0,4), 1)
        end

        if(tonumber(ARGV[5])>0)
        then
            --车辆类型总数
            redis.call('HINCRBY', KEYS[1]..':cllxzs',ARGV[5], 1)
            --区车辆类型总数
            redis.call('HINCRBY', KEYS[1]..':'..ARGV[3]..':qcllxzs',ARGV[5], 1)
        end

        """
        multi_set = self.redis_counter.register_script(lua_set)

        #multi_set(keys=[20170503],args=[1493774429,'鲁BF6H50','370211',370211000003,1,1])
        try:
            self.handle_pool=[]
            self.pubsub_pool=[]
            self.handle_has_add = {}
            self.pubsub_part = {}
            self.pubsub_all = {}
            t = threading.Thread(target=self.monit_db_change)
            t.setDaemon(True)
            t.start()
            # t = threading.Thread(target=self.slow_query)
            # t.setDaemon(True)
            # t.start()
            for x in xrange(1):
                self.handle_pool.append(HANDLE())
                self.pubsub_pool.append(HANDLE())
                t = threading.Thread(target=self.analyse,name=str(x))#, args=(10,)
                t.setDaemon(True)
                t.start()

            
            logging.warning('开始分析...')
            pipe = redis_quick.pipeline()
            while 1:
                try:
                    row = redis_quick.lpop('_jobs')
                    if row:
                        result = json.loads(row)
                        dt = datetime.strptime(result['capture_time'], '%Y-%m-%d %H:%M:%S')#此处需要优化
                        capture_time_unix = int(time.mktime(dt.timetuple()))#此处需要优化
                        result['capture_time_unix'] = capture_time_unix
                        k = result['capture_time'].replace('-','').replace(' ','')[:8]
                        license_plate = result['license_plate'].encode('utf-8', "ignore")
                        if ord(license_plate[0])<128:
                            license_plate = '无牌'
                        region_id = str(result['region_id'])
                        location_id = str(result['location_id'])
                        level_id = str(result['level_id'])
                        plate_type_id = str(result['plate_type_id2'])
                        multi_set(keys=[k],args=[capture_time_unix,license_plate,region_id,location_id,level_id,plate_type_id])#keys:日期 args：拍摄时间，车牌，区域，卡口，车辆类型，车牌颜色
                        self.MSG_QUEEN.put(result)
                    else:
                        time.sleep(1)
                except KeyboardInterrupt:
                    logging.error('Ctrl+C,终止运行')
                    return
                except Exception, e:
                    logging.exception('连接redis时错误: %s', str(e))
                    time.sleep(10)
        except Exception, e:
            logging.exception('取数据时错误: %s', str(e))
            sys.exit(1)
    
if __name__ == "__main__":
    daemon = MyDaemon('/var/run/yisa_counter.pid')
    #daemon.run()
    #sys.exit(1)
    if len(sys.argv) == 2:
        if 'start' == sys.argv[1]:
            daemon.start()
        elif 'stop' == sys.argv[1]:
            daemon.stop()
        elif 'restart' == sys.argv[1]:
            daemon.restart()
        else:
            print "Unknown command"
            sys.exit(2)
        sys.exit(0)
    else:
        print "usage: %s start|stop|restart" % sys.argv[0]
        sys.exit(2)